ReplaySubject, BehaviorSubject & AsyncSubject are special types of subjects in Angular. In this tutorial let us learn what are they, how they work & how to use them in Angular
BehaviorSubjectrequires an initial value and stores the current value and emits it to the new subscribers.
import { Component, VERSION } from "@angular/core";
import { BehaviorSubject, Subject } from "rxjs";
@Component({
selector: "my-app",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent {
subject$ = new BehaviorSubject("0");
ngOnInit() {
this.subject$.subscribe(val => {
console.log("Sub1 " + val);
});
this.subject$.next("1");
this.subject$.next("2");
this.subject$.subscribe(val => {
console.log("sub2 " + val);
});
this.subject$.next("3");
this.subject$.complete();
}
}
***Result***
Sub1 0
Sub1 1
Sub1 2
sub2 2
Sub1 3
sub2 3
We create a new BehaviorSubject providing it an initial value or seed value. The BehaviorSubject stores the initial value.
subject$ = new BehaviorSubject("0");
As soon as the first subscriber subscribes to it, the BehaviorSubject emits the stored value. i.e. 0
this.subject$.subscribe(val => {
console.log("Sub1 " + val);
});
We emit two more values. The BehaviorSubject stores the last value emitted i.e. 2
this.subject$.next("1");
this.subject$.next("2");
Now, Subscriber2 subscribes to it. It immediately receives the last value stored i.e. 2
this.subject$.subscribe(val => {
console.log("sub2 " + val);
});
ReplaySubject replays old values to new subscribers when they first subscribe.
The ReplaySubject will store every value it emits in a buffer. It will emit them to the new subscribers in the order it received them. You can configure the buffer using the arguments bufferSize and windowTime
bufferSize: No of items that ReplaySubject will keep in its buffer. It defaults to infinity.
windowTime: The amount of time to keep the value in the buffer. Defaults to infinity.
First, we create a ReplaySubject
import { Component, VERSION } from "@angular/core";
import { ReplaySubject, Subject } from "rxjs";
@Component({
selector: "my-app",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent {
subject$ = new ReplaySubject();
ngOnInit() {
this.subject$.next("1");
this.subject$.next("2");
this.subject$.subscribe(
val => console.log("Sub1 " + val),
err => console.error("Sub1 " + err),
() => console.log("Sub1 Complete")
);
this.subject$.next("3");
this.subject$.next("4");
this.subject$.subscribe(val => {
console.log("sub2 " + val);
});
this.subject$.next("5");
this.subject$.complete();
this.subject$.error("err");
this.subject$.next("6");
this.subject$.subscribe(
val => {
console.log("sub3 " + val);
},
err => console.error("sub3 " + err),
() => console.log("Complete")
);
}
}
***Result***
Sub1 1
Sub1 2
Sub1 3
Sub1 4
sub2 1
sub2 2
sub2 3
sub2 4
Sub1 5
sub2 5
Sub1 Complete
sub3 1
sub3 2
sub3 3
sub3 4
sub3 5
sub3 err
First, we create a ReplaySubject
subject$ = new ReplaySubject();
ReplaySubject emits two values. It will also store these in a buffer.
this.subject$.next("1");
this.subject$.next("2");
We subscribe to it. The observer will receive 1 & 2 from the buffer
this.subject$.subscribe(
val => console.log("Sub1 " + val),
err => console.error("Sub1 " + err),
() => console.log("Sub1 Complete")
);
We subscribe again after emitting two more values. The new subscriber will also receive all the previous values.
this.subject$.next("3");
this.subject$.next("4");
this.subject$.subscribe(val => {
console.log("sub2 " + val);
});
We emit one more value & complete. All the subscribers will receive complete. They will not receive any further values or notifcations.
this.subject$.next("5");
this.subject$.complete();
We now fire an error notification and a value. None of the previous subscribers will receive this as they are already closed.
this.subject$.error("err");
this.subject$.next("6");
Now, we subscribe again. The subscriber will receive all the values up to Complete. But will not receive the Complete notification, instead, it will receive the Error notification.
this.subject$.subscribe(
val => {
console.log("sub3 " + val);
},
err => console.error("sub3 " + err),
() => console.log("Complete")
);
AsyncSubject only emits the latest value only when it completes. If it errors out then it will emit an error, but will not emit any values.
import { Component, VERSION } from "@angular/core";
import { AsyncSubject, Subject } from "rxjs";
@Component({
selector: "my-app",
templateUrl: "./app.component.html",
styleUrls: ["./app.component.css"]
})
export class AppComponent {
subject$ = new AsyncSubject();
ngOnInit() {
this.subject$.next("1");
this.subject$.next("2");
this.subject$.subscribe(
val => console.log("Sub1 " + val),
err => console.error("Sub1 " + err),
() => console.log("Sub1 Complete")
);
this.subject$.next("3");
this.subject$.next("4");
this.subject$.subscribe(val => {
console.log("sub2 " + val);
});
this.subject$.next("5");
this.subject$.complete();
this.subject$.error("err");
this.subject$.next("6");
this.subject$.subscribe(
val => console.log("Sub3 " + val),
err => console.error("sub3 " + err),
() => console.log("Sub3 Complete")
);
}
}
**Result **
Sub1 5
sub2 5
Sub1 Complete
Sub3 5
Sub3 Complete
In the above example, all the subscribers will receive the value 5 including those who subscribe after the complete event.
But if the AsyncSubject errors out, then all subscribers will receive an error notification and no value.